package drds.data_propagate.parse;

import drds.data_propagate.binlog.AbstractFetcher;
import drds.data_propagate.driver.socket.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedByInterruptException;

/**
 * 基于socket的logEvent实现
 *
 * @author jianghang 2013-1-14 下午07:39:30
 * @version 1.0.0
 */
public class Fetcher extends AbstractFetcher {

    // Master heartbeat interval
    public static final int MASTER_HEARTBEAT_PERIOD_SECONDS = 15;
    /**
     * Command to dump binlog
     */
    public static final byte COM_BINLOG_DUMP = 18;
    /**
     * Packet headerPacket sizes
     */
    public static final int NET_HEADER_SIZE = 4;
    public static final int SQLSTATE_LENGTH = 5;
    /**
     * Packet offsets
     */
    public static final int PACKET_LEN_OFFSET = 0;
    public static final int PACKET_SEQ_OFFSET = 3;
    /**
     * Maximum packet length
     */
    public static final int MAX_PACKET_LENGTH = (256 * 256 * 256 - 1);
    protected static final Logger logger = LoggerFactory.getLogger(Fetcher.class);
    // +10s 确保 timeout > heartbeat interval
    private static final int READ_TIMEOUT_MILLISECONDS = (MASTER_HEARTBEAT_PERIOD_SECONDS + 10) * 1000;
    private SocketChannel channel;

    private boolean issemi = false;

    // private BufferedInputStream input;

    public Fetcher() {
        super(DEFAULT_INITIAL_CAPACITY, DEFAULT_GROWTH_FACTOR);
    }

    public Fetcher(final int initialCapacity) {
        super(initialCapacity, DEFAULT_GROWTH_FACTOR);
    }

    public Fetcher(final int initialCapacity, final float growthFactor) {
        super(initialCapacity, growthFactor);
    }

    public void start(SocketChannel channel) throws IOException {
        this.channel = channel;
        String dbsemi = System.getProperty("db.semi");
        if ("1".equals(dbsemi)) {
            issemi = true;
        }
    }

    /**
     * {@inheritDoc}
     *
     * @see AbstractFetcher#fetchNextBinlogEvent()
     */
    public boolean fetchNextBinlogEvent() throws IOException {
        try {
            // Fetching packet headerPacket from input.
            if (!fetch0(0, NET_HEADER_SIZE)) {
                logger.warn("Reached end of input stream while fetching headerPacket");
                return false;
            }

            // Fetching the first packet(may a multi-packet).
            int netlen = getUint24(PACKET_LEN_OFFSET);
            int netnum = getUint8(PACKET_SEQ_OFFSET);
            if (!fetch0(NET_HEADER_SIZE, netlen)) {
                logger.warn("Reached end of input stream: packet #" + netnum + ", len = " + netlen);
                return false;
            }

            // Detecting error code.
            final int mark = getUint8(NET_HEADER_SIZE);
            if (mark != 0) {
                if (mark == 255) // error from master
                {
                    // Indicates an error, for example trying to fetchNextBinlogEvent from
                    // wrong
                    // binlog position.
                    position = NET_HEADER_SIZE + 1;
                    final int errno = getInt16();
                    String sqlstate = forward(1).getFixString(SQLSTATE_LENGTH);
                    String errmsg = getFixString(limit - position);
                    throw new IOException("Received error packet:" + " errno = " + errno + ", sqlstate = " + sqlstate
                            + " errmsg = " + errmsg);
                } else if (mark == 254) {
                    // Indicates end of stream. It's not clear when this would
                    // be sent.
                    logger.warn("Received EOF packet from server, apparent"
                            + " master disconnected. It's may be duplicate slaveId , check instance config");
                    return false;
                } else {
                    // Should not happen.
                    throw new IOException("Unexpected response " + mark + " while fetching binlog: packet #" + netnum
                            + ", len = " + netlen);
                }
            }

            // if mysql is in semi mode
            if (issemi) {
                // parse semi mark
                int semimark = getUint8(NET_HEADER_SIZE + 1);
                int semival = getUint8(NET_HEADER_SIZE + 2);
                this.semival = semival;
            }

            // The first packet is a multi-packet, concatenate the packets.
            while (netlen == MAX_PACKET_LENGTH) {
                if (!fetch0(0, NET_HEADER_SIZE)) {
                    logger.warn("Reached end of input stream while fetching headerPacket");
                    return false;
                }

                netlen = getUint24(PACKET_LEN_OFFSET);
                netnum = getUint8(PACKET_SEQ_OFFSET);
                if (!fetch0(limit, netlen)) {
                    logger.warn("Reached end of input stream: packet #" + netnum + ", len = " + netlen);
                    return false;
                }
            }

            // Preparing bytes variables to decoding.
            if (issemi) {
                origin = NET_HEADER_SIZE + 3;
            } else {
                origin = NET_HEADER_SIZE + 1;
            }
            position = origin;
            limit -= origin;
            return true;
        } catch (SocketTimeoutException e) {
            close(); /* Do cleanup */
            logger.error("Socket timeout expired, closing connection", e);
            throw e;
        } catch (InterruptedIOException e) {
            close(); /* Do cleanup */
            logger.info("I/O interrupted while reading from client socket", e);
            throw e;
        } catch (ClosedByInterruptException e) {
            close(); /* Do cleanup */
            logger.info("I/O interrupted while reading from client socket", e);
            throw e;
        } catch (IOException e) {
            close(); /* Do cleanup */
            logger.error("I/O error while reading from client socket", e);
            throw e;
        }
    }

    private final boolean fetch0(final int off, final int len) throws IOException {
        ensureCapacity(off + len);

        // byte[] read = channel.read(len, READ_TIMEOUT_MILLISECONDS);
        // System.arraycopy(read, 0, this.bytes, off, len);

        channel.read(bytes, off, len, READ_TIMEOUT_MILLISECONDS);
        if (limit < off + len) {
            limit = off + len;
        }
        return true;
    }

    /**
     * {@inheritDoc}
     *
     * @see AbstractFetcher#close()
     */
    public void close() throws IOException {
        // do nothing
    }

}
